package com.atguigu.flink.datastreamapi.source;

import com.atguigu.flink.pojo.WaterSensor;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Created by Smexy on 2023/11/11
 */
public class Demo6_DataGen
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(20);

        /*
            DataGeneratorSource(
                GeneratorFunction<Long, OUT> generatorFunction, :编写一个函数，生成OUT类型的数据
            long count,          ： 生成数据的总数。 Long.MaxVALUE ,接近无限的数据，就是无界流
            RateLimiterStrategy rateLimiterStrategy,  ： 指定生成的速度
            TypeInformation<OUT> typeInfo ： 提供OUT类型的类型信息
            )

            生成的传感器id是 s1 -- s10
            ts: 当前时间
            vc: 10 -- 1000
         */
        DataGeneratorSource<WaterSensor> source = new DataGeneratorSource<>(
            new GeneratorFunction<Long, WaterSensor>()
            {
                @Override
                public WaterSensor map(Long value) throws Exception {
                    return new WaterSensor(
                        "s" + RandomUtils.nextInt(1, 11),
                        System.currentTimeMillis(),
                        RandomUtils.nextInt(10, 1001)
                    );
                }
            },
            Long.MAX_VALUE,
            RateLimiterStrategy.perSecond(40d),
            TypeInformation.of(WaterSensor.class)
        );

        env.fromSource(source, WatermarkStrategy.noWatermarks(),"datagen")
            .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}
